데이터 제공 사이트 활용
다양한 데이터 제공 사이트를 활용하면 분석에 필요한 고품질 데이터를 효율적으로 확보할 수 있다. 공공 데이터부터 전문 데이터셋까지, 각 사이트의 특성을 이해하고 활용하는 방법을 알아보자.
공공데이터 포털 완전 활용
공공데이터포털(data.go.kr) 기본 활용
import requests
import pandas as pd
import json
from urllib.parse import urlencode
class PublicDataAPI:
def __init__(self, service_key):
self.service_key = service_key
self.base_url = "http://apis.data.go.kr"
def get_population_data(self, region_code, year):
"""인구 통계 데이터 조회"""
endpoint = "/1160100/service/GetPopulationKoreanService/getPopulationKorean"
params = {
'serviceKey': self.service_key,
'numOfRows': 100,
'pageNo': 1,
'resultType': 'json',
'admCode': region_code,
'year': year
}
url = self.base_url + endpoint + '?' + urlencode(params)
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
if 'response' in data and 'body' in data['response']:
items = data['response']['body']['items']['item']
df = pd.DataFrame(items)
return df
else:
print("데이터를 찾을 수 없습니다.")
return pd.DataFrame()
except requests.RequestException as e:
print(f"API 요청 오류: {e}")
return pd.DataFrame()
def get_business_data(self, business_type, region):
"""사업체 통계 데이터 조회"""
# 실제 API 엔드포인트에 맞게 구현
pass
# 사용 예시
# api = PublicDataAPI('your_service_key_here')
# population_df = api.get_population_data('11000', '2023')
서울시 열린데이터광장 활용
import requests
import pandas as pd
class SeoulOpenData:
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "http://openapi.seoul.go.kr:8088"
def get_subway_passenger_data(self, start_date, end_date):
"""지하철 승하차 데이터 조회"""
service_name = "CardSubwayStatsNew"
url = f"{self.base_url}/{self.api_key}/json/{service_name}/1/1000/{start_date}/{end_date}"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
if service_name in data:
items = data[service_name]['row']
df = pd.DataFrame(items)
# 데이터 타입 변환
numeric_cols = ['RIDE_PASGR_NUM', 'ALIGHT_PASGR_NUM']
for col in numeric_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
return df
else:
print("데이터를 찾을 수 없습니다.")
return pd.DataFrame()
except requests.RequestException as e:
print(f"API 요청 오류: {e}")
return pd.DataFrame()
def get_air_quality_data(self, district):
"""대기질 데이터 조회"""
service_name = "RealtimeCityAir"
url = f"{self.base_url}/{self.api_key}/json/{service_name}/1/25"
try:
response = requests.get(url)
data = response.json()
if service_name in data:
items = data[service_name]['row']
df = pd.DataFrame(items)
if district:
df = df[df['MSRSTE_NM'].str.contains(district, na=False)]
return df
else:
return pd.DataFrame()
except Exception as e:
print(f"오류 발생: {e}")
return pd.DataFrame()
# 사용 예시
# seoul_api = SeoulOpenData('your_api_key_here')
# subway_data = seoul_api.get_subway_passenger_data('20231201', '20231231')
# air_data = seoul_api.get_air_quality_data('강남구')
Kaggle 데이터셋 다운로드 자동화
Kaggle API 설정 및 활용
import kaggle
import pandas as pd
import os
import zipfile
class KaggleDataManager:
def __init__(self):
# Kaggle API 키 설정 필요 (~/.kaggle/kaggle.json)
self.api = kaggle.KaggleApi()
self.api.authenticate()
def search_datasets(self, keyword, max_size_mb=100):
"""키워드로 데이터셋 검색"""
datasets = self.api.dataset_list(search=keyword, max_size=max_size_mb*1024*1024)
dataset_info = []
for dataset in datasets:
dataset_info.append({
'title': dataset.title,
'ref': dataset.ref,
'size_mb': dataset.totalBytes / (1024*1024) if dataset.totalBytes else 0,
'download_count': dataset.downloadCount,
'vote_count': dataset.voteCount,
'usability_rating': dataset.usabilityRating
})
return pd.DataFrame(dataset_info).sort_values('usability_rating', ascending=False)
def download_dataset(self, dataset_ref, extract_path='./kaggle_data'):
"""데이터셋 다운로드 및 압축 해제"""
try:
# 디렉토리 생성
os.makedirs(extract_path, exist_ok=True)
# 데이터셋 다운로드
self.api.dataset_download_files(dataset_ref, path=extract_path, unzip=True)
print(f"다운로드 완료: {dataset_ref}")
# 다운로드된 파일 목록 반환
files = []
for root, dirs, filenames in os.walk(extract_path):
for filename in filenames:
files.append(os.path.join(root, filename))
return files
except Exception as e:
print(f"다운로드 오류: {e}")
return []
def auto_load_csv_files(self, dataset_ref):
"""데이터셋 다운로드 후 CSV 파일들을 자동으로 로드"""
files = self.download_dataset(dataset_ref)
dataframes = {}
for file_path in files:
if file_path.endswith('.csv'):
try:
filename = os.path.basename(file_path)
df = pd.read_csv(file_path)
dataframes[filename] = df
print(f"로드 완료: {filename} - {df.shape}")
except Exception as e:
print(f"파일 로드 오류 {file_path}: {e}")
return dataframes
def get_competition_data(self, competition_name):
"""Kaggle 경진대회 데이터 다운로드"""
try:
self.api.competition_download_files(competition_name, path='./competition_data')
print(f"경진대회 데이터 다운로드 완료: {competition_name}")
# ZIP 파일 압축 해제
zip_path = f"./competition_data/{competition_name}.zip"
if os.path.exists(zip_path):
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall('./competition_data')
os.remove(zip_path)
except Exception as e:
print(f"경진대회 데이터 다운로드 오류: {e}")
# 사용 예시
# kaggle_manager = KaggleDataManager()
#
# # 데이터셋 검색
# datasets = kaggle_manager.search_datasets('house prices', max_size_mb=50)
# print(datasets.head())
#
# # 데이터셋 다운로드 및 로드
# dataframes = kaggle_manager.auto_load_csv_files('paultimothymooney/chest-xray-pneumonia')
금융 데이터 API
yfinance를 활용한 주식 데이터 수집
import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
class FinanceDataCollector:
def __init__(self):
pass
def get_stock_data(self, symbols, period='1y', interval='1d'):
"""주식 데이터 수집"""
if isinstance(symbols, str):
symbols = [symbols]
stock_data = {}
for symbol in symbols:
try:
ticker = yf.Ticker(symbol)
data = ticker.history(period=period, interval=interval)
stock_data[symbol] = data
print(f"데이터 수집 완료: {symbol}")
except Exception as e:
print(f"데이터 수집 오류 {symbol}: {e}")
return stock_data
def get_multiple_stocks_comparison(self, symbols, start_date, end_date):
"""여러 주식 비교 데이터"""
data = yf.download(symbols, start=start_date, end=end_date)
# 종가만 추출하여 비교
if len(symbols) > 1:
close_prices = data['Close']
else:
close_prices = data['Close'].to_frame(symbols[0])
# 수익률 계산
returns = close_prices.pct_change().dropna()
# 누적 수익률 계산
cumulative_returns = (1 + returns).cumprod()
return {
'prices': close_prices,
'returns': returns,
'cumulative_returns': cumulative_returns
}
def get_market_indices(self):
"""주요 시장 지수 데이터"""
indices = {
'S&P 500': '^GSPC',
'NASDAQ': '^IXIC',
'DOW': '^DJI',
'KOSPI': '^KS11',
'KOSDAQ': '^KQ11'
}
index_data = {}
for name, symbol in indices.items():
try:
ticker = yf.Ticker(symbol)
data = ticker.history(period='1y')
index_data[name] = data
except Exception as e:
print(f"지수 데이터 수집 오류 {name}: {e}")
return index_data
def get_company_info(self, symbol):
"""기업 정보 조회"""
ticker = yf.Ticker(symbol)
info = ticker.info
# 주요 정보만 추출
key_info = {
'company_name': info.get('longName', 'N/A'),
'sector': info.get('sector', 'N/A'),
'industry': info.get('industry', 'N/A'),
'market_cap': info.get('marketCap', 'N/A'),
'pe_ratio': info.get('trailingPE', 'N/A'),
'dividend_yield': info.get('dividendYield', 'N/A'),
'beta': info.get('beta', 'N/A')
}
return key_info
# 사용 예시
# finance_collector = FinanceDataCollector()
#
# # 개별 주식 데이터
# apple_data = finance_collector.get_stock_data('AAPL', period='2y')
#
# # 여러 주식 비교
# comparison = finance_collector.get_multiple_stocks_comparison(
# ['AAPL', 'GOOGL', 'MSFT'],
# '2023-01-01',
# '2023-12-31'
# )
#
# # 시장 지수 데이터
# indices = finance_collector.get_market_indices()
한국투자증권 API 활용
import requests
import pandas as pd
import json
class KISDataAPI:
def __init__(self, app_key, app_secret):
self.app_key = app_key
self.app_secret = app_secret
self.base_url = "https://openapi.koreainvestment.com:9443"
self.access_token = None
self.get_access_token()
def get_access_token(self):
"""접근 토큰 발급"""
url = f"{self.base_url}/oauth2/tokenP"
headers = {
"content-type": "application/json"
}
data = {
"grant_type": "client_credentials",
"appkey": self.app_key,
"appsecret": self.app_secret
}
try:
response = requests.post(url, headers=headers, data=json.dumps(data))
response.raise_for_status()
result = response.json()
self.access_token = result['access_token']
print("접근 토큰 발급 완료")
except Exception as e:
print(f"토큰 발급 오류: {e}")
def get_stock_price(self, stock_code):
"""주식 현재가 조회"""
url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-price"
headers = {
"authorization": f"Bearer {self.access_token}",
"appkey": self.app_key,
"appsecret": self.app_secret,
"tr_id": "FHKST01010100"
}
params = {
"fid_cond_mrkt_div_code": "J",
"fid_input_iscd": stock_code
}
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
if data['rt_cd'] == '0':
output = data['output']
return {
'stock_code': stock_code,
'stock_name': output['hts_kor_isnm'],
'current_price': int(output['stck_prpr']),
'change_rate': float(output['prdy_ctrt']),
'volume': int(output['acml_vol'])
}
else:
print(f"데이터 조회 실패: {data['msg1']}")
return None
except Exception as e:
print(f"주식 가격 조회 오류: {e}")
return None
# 사용 예시 (실제 API 키 필요)
# kis_api = KISDataAPI('your_app_key', 'your_app_secret')
# samsung_price = kis_api.get_stock_price('005930') # 삼성전자
소셜미디어 데이터 수집 기초
Twitter API v2 활용
import tweepy
import pandas as pd
from datetime import datetime, timedelta
class TwitterDataCollector:
def __init__(self, bearer_token):
self.client = tweepy.Client(bearer_token=bearer_token)
def search_tweets(self, query, max_results=100, lang='ko'):
"""트윗 검색"""
try:
tweets = tweepy.Paginator(
self.client.search_recent_tweets,
query=query,
max_results=max_results,
tweet_fields=['created_at', 'author_id', 'public_metrics', 'lang']
).flatten(limit=max_results)
tweet_data = []
for tweet in tweets:
if tweet.lang == lang: # 언어 필터링
tweet_data.append({
'id': tweet.id,
'text': tweet.text,
'created_at': tweet.created_at,
'author_id': tweet.author_id,
'retweet_count': tweet.public_metrics['retweet_count'],
'like_count': tweet.public_metrics['like_count'],
'reply_count': tweet.public_metrics['reply_count']
})
return pd.DataFrame(tweet_data)
except Exception as e:
print(f"트윗 검색 오류: {e}")
return pd.DataFrame()
def analyze_tweet_sentiment(self, df):
"""간단한 감정 분석 (키워드 기반)"""
positive_words = ['좋다', '훌륭하다', '최고', '감사', '행복', '만족']
negative_words = ['나쁘다', '최악', '실망', '화나다', '짜증', '불만']
def get_sentiment(text):
positive_count = sum(1 for word in positive_words if word in text)
negative_count = sum(1 for word in negative_words if word in text)
if positive_count > negative_count:
return 'positive'
elif negative_count > positive_count:
return 'negative'
else:
return 'neutral'
df['sentiment'] = df['text'].apply(get_sentiment)
return df
# 사용 예시 (실제 Bearer Token 필요)
# twitter_collector = TwitterDataCollector('your_bearer_token')
# tweets_df = twitter_collector.search_tweets('#데이터분석', max_results=50)
# analyzed_tweets = twitter_collector.analyze_tweet_sentiment(tweets_df)
데이터 품질 검증 및 전처리
수집된 데이터 품질 체크
def validate_collected_data(df, source_name):
"""수집된 데이터의 품질을 검증"""
quality_report = {
'source': source_name,
'total_rows': len(df),
'total_columns': len(df.columns),
'missing_data': {},
'data_types': {},
'duplicates': 0,
'quality_score': 0
}
# 결측치 확인
for col in df.columns:
missing_count = df[col].isnull().sum()
missing_percent = (missing_count / len(df)) * 100
quality_report['missing_data'][col] = {
'count': missing_count,
'percentage': round(missing_percent, 2)
}
# 데이터 타입 확인
quality_report['data_types'] = df.dtypes.to_dict()
# 중복 데이터 확인
quality_report['duplicates'] = df.duplicated().sum()
# 품질 점수 계산 (간단한 예시)
missing_penalty = sum([info['percentage'] for info in quality_report['missing_data'].values()]) / len(df.columns)
duplicate_penalty = (quality_report['duplicates'] / len(df)) * 100
quality_report['quality_score'] = max(0, 100 - missing_penalty - duplicate_penalty)
return quality_report
def clean_collected_data(df):
"""수집된 데이터 기본 정리"""
# 1. 중복 제거
df_cleaned = df.drop_duplicates()
# 2. 공백 문자 정리
for col in df_cleaned.select_dtypes(include=['object']).columns:
df_cleaned[col] = df_cleaned[col].astype(str).str.strip()
# 3. 날짜 컬럼 자동 감지 및 변환
for col in df_cleaned.columns:
if 'date' in col.lower() or 'time' in col.lower():
try:
df_cleaned[col] = pd.to_datetime(df_cleaned[col], errors='coerce')
except:
pass
# 4. 숫자 컬럼 자동 감지 및 변환
for col in df_cleaned.select_dtypes(include=['object']).columns:
try:
# 숫자로 변환 가능한지 확인
numeric_series = pd.to_numeric(df_cleaned[col], errors='coerce')
if numeric_series.notna().sum() > len(df_cleaned) * 0.8: # 80% 이상이 숫자라면
df_cleaned[col] = numeric_series
except:
pass
return df_cleaned
# 사용 예시
# quality_report = validate_collected_data(kaggle_df, 'Kaggle Dataset')
# cleaned_df = clean_collected_data(kaggle_df)
정리
다양한 데이터 제공 사이트를 효과적으로 활용하면 분석에 필요한 풍부한 데이터를 확보할 수 있다. 각 사이트의 API 특성을 이해하고, 데이터 품질을 검증하며, 적절한 전처리를 수행하는 것이 중요하다. 특히 공공 데이터와 금융 데이터는 신뢰할 수 있는 분석의 기반이 되므로 적극 활용하는 것이 좋다.
다음 섹션에서는 AI를 활용한 데이터 수집에 대해 알아보겠다.